package gu.simplemq.proton;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import gu.simplemq.Channel;
import gu.simplemq.IMessageQueueFactory;
import gu.simplemq.ISubscriber;
import gu.simplemq.exceptions.SmqUnsubscribeException;
import gu.simplemq.jms.MessageQueueFactoryImpl;

/**
 * @author guyadong
 *
 */
public class ProtonSubscriberTest implements ProtonConstants {
	private static final String TOPIC_PREIFX = "ActiveMQ.Advisory.Consumer.Topic.";
	private static final String AMQP_HOST = "192.168.10.226";

	private static JmsConnectionFactory createFactory(){
    	Map<String, String> props = Maps.newHashMap();
    	props.put("remoteURI","amqp://" + AMQP_HOST + ":5672");
    	JmsConnectionFactory factory = new JmsConnectionFactory();
       factory.setProperties(props);    
       return factory;
	}
	private static Connection conn;
	private static Session session;

	@BeforeClass
	public static void setUpBeforeClass() throws Exception {
		JmsConnectionFactory factory = createFactory();
		conn = factory.createConnection("user", "password");
		conn.setExceptionListener(new MyExceptionListener());
		conn.start();
		session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
	}

	@AfterClass
	public static void tearDownAfterClass() throws Exception {
		session.close();
		conn.close();
		//pool.close();
	}
	public void sub(Session session,String topic,MessageListener listener) throws JMSException {
		Topic activeMQTopic = session.createTopic(topic);
		MessageConsumer consumer = session.createConsumer(activeMQTopic);
		consumer.setMessageListener(listener);
	}
	private static void waitquit(){
		System.out.println("PRESS 'quit' OR 'CTRL-C' to exit");
		BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
		try{
			while(!"quit".equalsIgnoreCase(reader.readLine())){
			}
			System.exit(0);
		} catch (IOException e) {
	
		}finally {
	
		}
	}
	@Test
	public void test1() {
		try {
			sub(session,TOPIC_PREIFX + "*", new AdvisoryListener());
			sub(session,"chat1", new LogListener());
			sub(session,"chat2", new LogListener());
			sub(session,"chat3", new LogListener());
			waitquit();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	public void test2() {
//		ImmutableMap<String,String> m = ImmutableMap.of(MQ_USERNAME,"user",MQ_PASSWORD,"password");
		ImmutableMap<String,String> m = ImmutableMap.of(MQ_URI, AMQP_SCHEMA+"://user:password@" + AMQP_HOST +":" + DEFAULT_AMQP_PORT);

		@SuppressWarnings("resource")
		IMessageQueueFactory factory = new MessageQueueFactoryImpl(ProtonRuntimeContext.PROTON_CONTEXT).init((Map)m);
		try{
			ISubscriber subscriber = factory.getSubscriber();
			sub(session,TOPIC_PREIFX + "*",new AdvisoryListener());
			Channel<String> chat1 = new LogChannel("chat1");
			Channel<String> chat2 = new LogChannel("chat2");
			Channel<String> chat3 = new LogChannel("chat3");
			subscriber.register(chat1,chat2);		
			subscriber.register(chat3);
			waitquit();
			factory.close();
		} catch (Throwable e) {
			e.printStackTrace();
		}
		//		subscriber.unsubscribe(chat1.name);
		//		subscriber.unsubscribe();
	}
	class LogChannel extends Channel<String>{

		protected LogChannel(String name) {
			super(name);
		}
		@Override
		public void onSubscribe(String t) throws SmqUnsubscribeException {
			logger.info(name + " msg:" + t);				
		}
	}
	private static class AdvisoryListener implements MessageListener{
		@Override
		public void onMessage(Message message) {
			try {
				logger.info("dest {}",message.getJMSDestination());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
	private static class LogListener implements MessageListener{
		private String textOf(Message message) throws JMSException{
			if(message instanceof TextMessage){
				return ((TextMessage) message).getText();
			}
			if(message instanceof BytesMessage){
				BytesMessage bytesMessage = (BytesMessage)message;
				byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
				bytesMessage.readBytes(buf);
				return new String(buf);
			}
			throw new IllegalArgumentException(String.format("INVALID message type,%s,%s required",
					TextMessage.class.getName(),
					BytesMessage.class.getName()));
		}
		@Override
		public void onMessage(Message message) {
			try {
				logger.info("dest {}:{}",message.getJMSDestination(),textOf(message));
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
    private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
    }
}
